import contextlib
import ssl
from abc import ABC, abstractmethod
from collections.abc import Generator
from dataclasses import dataclass, field
from enum import Enum
from json import dumps
from pathlib import Path
from time import sleep
from types import TracebackType
from typing import TYPE_CHECKING, Any, Literal, TypedDict

import httpx
import websocket
from httpx import Response

from src.utils import data as setting
from src.utils import file, tool
from src.utils.decorator import singleton

# 导入必要的模块
if TYPE_CHECKING:
	from src.utils.data import SettingManager


# ==================== 配置类 ====================
@dataclass
class ClientConfig:
	"""客户端配置"""

	base_url: str = "https://api.codemao.cn"
	timeout: float = 30.0
	max_retries: int = 3
	retry_delay: float = 1.0
	log_requests: bool = True


class HTTPStatus(Enum):
	"""HTTP状态码枚举"""

	CREATED = 201
	FORBIDDEN = 403
	NOT_FOUND = 404
	NOT_MODIFIED = 304
	NO_CONTENT = 204
	OK = 200


class PaginationConfig(TypedDict, total=False):
	"""分页配置"""

	amount_key: str
	offset_key: str
	response_amount_key: str
	response_offset_key: str


# ==================== 类型定义 ====================
HttpMethod = Literal["GET", "POST", "DELETE", "PATCH", "PUT", "HEAD"]
FetchMethod = Literal["GET", "POST"]


# ==================== 接口定义 ====================
class IHTTPClient(ABC):
	"""HTTP客户端接口"""

	@abstractmethod
	def send_request(self, method: str, endpoint: str) -> Response:
		"""发送HTTP请求"""

	@abstractmethod
	def update_headers(self, headers: dict[str, str]) -> None:
		"""更新请求头"""

	@abstractmethod
	def fetch_paginated_data(
		self,
		endpoint: str,
		params: dict[str, Any],
		payload: dict[str, Any] | None = None,
		method: FetchMethod = "GET",
		limit: int | None = None,
		total_key: str = "total",
		data_key: str = "items",
		pagination_method: Literal["offset", "page"] = "offset",
		config: PaginationConfig | None = None,
	) -> Generator[dict[str, Any]]:
		"""获取分页数据"""

	@abstractmethod
	def get_pagination_total(
		self,
		endpoint: str,
		params: dict[str, Any],
		payload: dict[str, Any] | None = None,
		fetch_method: FetchMethod = "GET",
		total_key: str = "total",
		data_key: str = "items",
		config: PaginationConfig | None = None,
	) -> dict[Literal["total", "total_pages"], int]:
		"""获取分页总数"""


class IWebSocketClient(ABC):
	"""WebSocket客户端接口"""

	@abstractmethod
	def connect(self, url: str) -> bool:
		"""连接WebSocket"""

	@abstractmethod
	def disconnect(self) -> None:
		"""断开连接"""

	@abstractmethod
	def send(self, message: str | dict[str, Any]) -> bool:
		"""发送消息"""

	@abstractmethod
	def receive(self, timeout: float = 30.0) -> str | bytes | None:
		"""接收消息"""

	@abstractmethod
	def listen(self) -> Generator[str | bytes]:
		"""监听消息"""


class IFileUploader(ABC):
	"""文件上传器接口"""

	@abstractmethod
	def upload(self, file_path: Path, method: str, save_path: str = "aumiao") -> str:
		"""上传文件"""


# ==================== 身份管理器 ====================
@dataclass
class Token:
	"""Token管理"""

	average: str = field(default="", metadata={"track": True})
	edu: str = field(default="", metadata={"track": False})
	judgement: str = field(default="", metadata={"track": True})
	blank: str = field(default="", metadata={"track": False})

	def __setattr__(self, name: str, value: str) -> None:
		"""属性设置监听"""
		if hasattr(self, name) and hasattr(self.__class__, name):
			field_meta = self.__dataclass_fields__[name].metadata
			if field_meta.get("track", False):
				old_value = getattr(self, name)
				if old_value != value:
					print(f"属性 '{name}' 已修改: {old_value[:10]!r}... → {value[:10]!r}...")
		super().__setattr__(name, value)


class IdentityManager:
	"""身份管理器 - 修复版本"""

	def __init__(self) -> None:
		self.tokens = Token()
		self._current_identity = "blank"
		self._token_map = {"average": "average", "edu": "edu", "judgement": "judgement", "blank": "blank"}
		self._backup_tokens = {}  # 添加令牌备份机制

	def switch_identity(self, identity: str, token: str) -> None:
		"""切换身份 - 修复版本"""
		if identity not in self._token_map:
			error_msg = f"无效的身份: {identity}"
			raise ValueError(error_msg)
		# 备份当前令牌(如果非空)
		current_token = self.get_current_token()
		if current_token and self._current_identity != "blank":
			self._backup_tokens[self._current_identity] = current_token
		# 设置新令牌
		if token and token.strip():
			setattr(self.tokens, self._token_map[identity], token)
			self._current_identity = identity
		else:
			print(f"警告:尝试设置空令牌到身份 {identity}")

	def restore_identity(self, identity: str) -> bool:
		"""恢复特定身份的令牌"""
		if identity in self._backup_tokens:
			token = self._backup_tokens[identity]
			if token and token.strip():
				setattr(self.tokens, self._token_map[identity], token)
				self._current_identity = identity
				print(f"已恢复身份: {identity}")
				return True
		return False

	def backup_current_token(self) -> None:
		"""备份当前令牌"""
		if self._current_identity != "blank":
			current_token = self.get_current_token()
			if current_token:
				self._backup_tokens[self._current_identity] = current_token

	def get_current_token(self) -> str:
		"""获取当前token - 修复版本"""
		token = getattr(self.tokens, self._token_map[self._current_identity])
		return token or ""  # 确保返回字符串,避免 None

	def get_identity_headers(self) -> dict[str, str]:
		"""获取身份认证头 - 修复版本"""
		token = self.get_current_token()
		if not token or not token.strip():
			print(f"警告:身份 '{self._current_identity}' 的令牌为空,无法生成认证头")
			return {}
		return {"Authorization": f"Bearer {token}"}

	@property
	def current_identity(self) -> str:
		"""获取当前身份"""
		return self._current_identity


# ==================== 基础实现 ====================
class BaseHTTPClient:
	"""基础HTTP客户端 - 优化版"""

	_DEFAULT_PAGE_SIZE = 15
	_MIN_PAGE_SIZE = 1

	def __init__(self, config: ClientConfig) -> None:
		self.config = config
		self.headers = setting.SettingManager().data.PROGRAM.HEADERS.copy()
		self._http_client = httpx.Client(headers=self.headers, timeout=config.timeout)
		self._file_handler = file.CodeMaoFile()
		self._data_processor = tool.DataProcessor()
		self.log_file = Path.cwd() / "logs" / f"requests_{tool.TimeUtils().current_timestamp()}.txt"
		self._pagination_config: PaginationConfig = {}

	def send_request(
		self,
		method: str,
		endpoint: str,
		params: dict[str, Any] | None = None,
		data: dict[str, Any] | None = None,
		payload: dict[str, Any] | None = None,
		files: dict[str, Any] | None = None,
		headers: dict[str, str] | None = None,
		retries: int | None = 1,
		backoff_factor: float = 0.3,
		timeout: float | None = None,
		*,
		log: bool = True,
	) -> Response:
		"""统一的HTTP请求方法"""
		url = endpoint if endpoint.startswith("http") else f"{self.config.base_url}{endpoint}"
		retries = retries or self.config.max_retries
		timeout = timeout or self.config.timeout
		log_enabled = bool(self.config.log_requests and log)
		for attempt in range(retries):
			try:
				request_headers = self._prepare_headers(headers, files)
				response = self._execute_request(method=method, url=url, params=params, data=data, payload=payload, files=files, headers=request_headers, timeout=timeout)
				if log_enabled:
					self._log_request(response)
				response.raise_for_status()
			except httpx.HTTPStatusError as e:
				if attempt == retries - 1:
					return e.response
				self._handle_retry(e, attempt)
			except (httpx.ConnectError, httpx.TimeoutException) as e:
				if attempt == retries - 1:
					raise
				self._handle_retry(e, attempt)
			except Exception as e:
				print(f"请求失败: {e}")
				break
			else:
				return response
			sleep(self.config.retry_delay * (2**attempt * backoff_factor))
		return Response(500)

	def _prepare_headers(self, headers: dict[str, str] | None, files: dict[str, Any] | None) -> dict[str, str]:
		"""准备请求头 - 修复版本"""
		# 合并基础头和新头
		request_headers = {**self._http_client.headers, **(headers or {})}
		# 检查 Authorization 头是否为空
		auth_header = request_headers.get("Authorization", "")
		if auth_header and (not auth_header.strip() or auth_header == "Bearer "):
			print("警告:Authorization 头为空,移除该头")
			request_headers.pop("Authorization", None)
		# 处理文件上传时的头
		if files:
			request_headers.pop("Content-Type", None)
			request_headers.pop("Content-Length", None)
		return request_headers

	def _execute_request(
		self,
		method: str,
		url: str,
		params: dict[str, Any] | None,
		data: dict[str, Any] | None,
		payload: dict[str, Any] | None,
		files: dict[str, Any] | None,
		headers: dict[str, str],
		timeout: float,
	) -> Response:
		"""执行HTTP请求"""
		request_args: dict[str, Any] = {"method": method.upper(), "url": url, "params": params, "headers": headers, "timeout": timeout}
		if files:
			request_args.update({"data": data, "files": files})
		else:
			request_args["json"] = payload
		return self._http_client.request(**request_args)

	@staticmethod
	def _handle_retry(error: Exception, attempt: int) -> None:
		"""处理重试逻辑"""
		print(f"请求失败,第 {attempt + 1} 次重试: {error}")

	def update_headers(self, headers: dict[str, str]) -> None:
		"""更新请求头 - 修复版本"""
		# 过滤空值头
		valid_headers = {k: v for k, v in headers.items() if v and v.strip()}
		self._http_client.headers.update(valid_headers)

	def _merge_pagination_config(self, config: PaginationConfig | None) -> PaginationConfig:
		"""合并分页配置"""
		if config is None:
			return self._pagination_config
		return {**self._pagination_config, **config}

	def _prepare_pagination_params(self, params: dict[str, Any], amount_key: str, *, include_first_page: bool) -> dict[str, Any]:
		"""准备分页请求参数"""
		request_params = params.copy()
		# 如果不包含第一页数据,使用较小页面大小快速获取元数据
		if not include_first_page and amount_key in request_params:
			request_params[amount_key] = self._DEFAULT_PAGE_SIZE
		return request_params

	def _safe_extract_total(self, data: dict[str, Any], total_key: str) -> int:
		"""安全提取总数"""
		total_raw = self._get_nested_value(data, total_key)
		try:
			return int(total_raw) if total_raw is not None else 0
		except (ValueError, TypeError):
			return 0

	def _calculate_items_per_page(self, response_data: dict[str, Any], request_params: dict[str, Any], config: PaginationConfig) -> int:
		"""计算每页项目数"""
		# 优先级: 请求参数 > 响应参数 > 默认值
		amount_key = config.get("amount_key", "")
		response_amount_key = config.get("response_amount_key", "")
		items_per_page = request_params.get(amount_key) or response_data.get(response_amount_key) or self._DEFAULT_PAGE_SIZE
		return max(items_per_page, self._MIN_PAGE_SIZE)

	def _extract_first_page(self, response_data: dict[str, Any], data_key: str, *, include_first_page: bool) -> list[dict[str, Any]]:
		"""提取第一页数据"""
		if not include_first_page:
			return []
		first_page_raw = self._get_nested_value(response_data, data_key)
		return first_page_raw if isinstance(first_page_raw, list) else []

	def _get_pagination_info(
		self,
		endpoint: str,
		params: dict[str, Any],
		payload: dict[str, Any] | None = None,
		fetch_method: FetchMethod = "GET",
		total_key: str = "total",
		data_key: str = "items",
		config: PaginationConfig | None = None,
		*,
		include_first_page: bool = False,
	) -> tuple[int, int, list[dict[str, Any]], dict[str, Any]]:
		"""获取分页信息 - 优化版"""
		# 合并配置
		config_ = self._merge_pagination_config(config)
		# 准备请求参数
		amount_key = config_.get("amount_key", "")
		request_params = self._prepare_pagination_params(params=params, amount_key=amount_key, include_first_page=include_first_page)
		# 发送请求
		response = self.send_request(fetch_method, endpoint, params=request_params, payload=payload)
		if response.status_code != HTTPStatus.OK.value:
			return 0, 0, [], {}
		response_data = response.json()
		# 提取关键信息
		total_items = self._safe_extract_total(response_data, total_key)
		items_per_page = self._calculate_items_per_page(response_data, request_params, config_)
		first_page = self._extract_first_page(response_data=response_data, data_key=data_key, include_first_page=include_first_page)
		return total_items, items_per_page, first_page, response_data

	@staticmethod
	def _get_nested_value(data: dict[str, Any], key: str) -> Any | None:  # noqa: ANN401
		"""获取嵌套值"""
		if not key or not isinstance(data, dict):
			return None
		keys = key.split(".")
		current = data
		for k in keys:
			if isinstance(current, dict) and k in current:
				current = current[k]
			else:
				return None
		return current

	@staticmethod
	def _reached_limit(current_count: int, limit: int | None) -> bool:
		"""检查是否达到限制"""
		return limit is not None and current_count >= limit

	@staticmethod
	def _calculate_remaining_items(total_items: int, first_page_count: int, limit: int | None, yielded_count: int) -> int:
		"""计算剩余需要获取的项目数"""
		remaining_from_total = total_items - first_page_count
		if limit is None:
			return remaining_from_total
		return min(remaining_from_total, limit - yielded_count)

	@staticmethod
	def _build_page_params(
		base_params: dict[str, Any],
		offset_key: str,
		page_idx: int,
		items_per_page: int,
		first_page_size: int,
		pagination_method: Literal["offset", "page"],
	) -> dict[str, Any]:
		"""构建页面请求参数"""
		page_params = base_params.copy()
		if pagination_method == "offset":
			page_params[offset_key] = first_page_size + (page_idx - 1) * items_per_page
		elif pagination_method == "page":
			page_params[offset_key] = page_idx + 1  # 第一页已经获取,从第二页开始
		else:
			error_msg = f"不支持的分页方式: {pagination_method}"
			raise ValueError(error_msg)
		return page_params

	def _fetch_single_page(
		self,
		endpoint: str,
		method: FetchMethod,
		params: dict[str, Any],
		payload: dict[str, Any] | None,
		data_key: str,
	) -> list[dict[str, Any]]:
		"""获取单个页面的数据"""
		response = self.send_request(method, endpoint, params=params, payload=payload)
		if response.status_code != HTTPStatus.OK.value:
			return []
		page_data_raw = self._get_nested_value(response.json(), data_key)
		return page_data_raw if isinstance(page_data_raw, list) else []

	def _fetch_remaining_pages(
		self,
		endpoint: str,
		base_params: dict[str, Any],
		payload: dict[str, Any] | None,
		method: FetchMethod,
		data_key: str,
		pagination_method: Literal["offset", "page"],
		config: PaginationConfig,
		items_per_page: int,
		first_page_size: int,
		remaining_to_fetch: int,
		current_count: int,
		limit: int | None,
	) -> Generator[dict[str, Any]]:
		total_pages = (remaining_to_fetch + items_per_page - 1) // items_per_page
		yielded_count = current_count
		offset_key = config.get("offset_key", "")
		for page_idx in range(1, total_pages + 1):
			page_params = self._build_page_params(base_params, offset_key, page_idx, items_per_page, first_page_size, pagination_method)
			page_data = self._fetch_single_page(endpoint, method, page_params, payload, data_key)
			if not page_data:
				continue
			for item in page_data:
				yield item
				yielded_count += 1
				if self._reached_limit(yielded_count, limit):
					return

	def fetch_paginated_data(
		self,
		endpoint: str,
		params: dict[str, Any],
		payload: dict[str, Any] | None = None,
		method: FetchMethod = "GET",
		limit: int | None = None,
		total_key: str = "total",
		data_key: str = "items",
		pagination_method: Literal["offset", "page"] = "offset",
		config: PaginationConfig | None = None,
	) -> Generator[dict[str, Any]]:
		# 获取分页信息
		total_items, items_per_page, first_page, _ = self._get_pagination_info(
			endpoint=endpoint, params=params, payload=payload, fetch_method=method, total_key=total_key, data_key=data_key, config=config, include_first_page=True
		)
		config_ = self._merge_pagination_config(config)
		base_params = params.copy()
		# 生成第一页数据
		yielded_count = 0
		for item in first_page:
			yield item
			yielded_count += 1
			if self._reached_limit(yielded_count, limit):
				return
		# 计算剩余需要获取的数据
		remaining_to_fetch = self._calculate_remaining_items(total_items, len(first_page), limit, yielded_count)
		if remaining_to_fetch <= 0:
			return
		# 分批获取剩余数据
		yield from self._fetch_remaining_pages(
			endpoint=endpoint,
			base_params=base_params,
			payload=payload,
			method=method,
			data_key=data_key,
			pagination_method=pagination_method,
			config=config_,
			items_per_page=items_per_page,
			first_page_size=len(first_page),
			remaining_to_fetch=remaining_to_fetch,
			current_count=yielded_count,
			limit=limit,
		)

	@staticmethod
	def _calculate_total_pages(total_items: int, items_per_page: int) -> int:
		"""计算总页数"""
		if total_items == 0:
			return 0
		return (total_items + items_per_page - 1) // items_per_page

	def get_pagination_total(
		self,
		endpoint: str,
		params: dict[str, Any],
		payload: dict[str, Any] | None = None,
		fetch_method: FetchMethod = "GET",
		total_key: str = "total",
		data_key: str = "items",
		config: PaginationConfig | None = None,
	) -> dict[Literal["total", "total_pages"], int]:
		"""获取分页总数 - 优化版"""
		total_items, items_per_page, _, _ = self._get_pagination_info(endpoint, params, payload, fetch_method, total_key, data_key, config, include_first_page=False)
		total_pages = self._calculate_total_pages(total_items, items_per_page)
		return {"total": total_items, "total_pages": total_pages}

	def _log_request(self, response: Response) -> None:
		"""记录请求日志"""
		log_entry = (
			f"[{tool.TimeUtils().format_timestamp()}]\n"
			f"Method: {response.request.method}\n"
			f"URL: {response.url}\n"
			f"Status: {response.status_code}\n"
			f"Response: {response.text}\n"
			f"{'=' * 50}\n\n"
		)
		self._file_handler.file_write(path=self.log_file, content=log_entry, method="a")

	def close(self) -> None:
		"""关闭HTTP客户端"""
		self._http_client.close()

	def __enter__(self) -> "BaseHTTPClient":
		return self

	def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
		self.close()


# ==================== 具体实现 ====================
@singleton
class CodeMaoClient(BaseHTTPClient):
	"""编程猫HTTP客户端 - 修复版本"""

	def __init__(self) -> None:
		setting_manager: SettingManager = setting.SettingManager()
		config = ClientConfig(log_requests=setting_manager.data.PARAMETER.log)
		super().__init__(config)
		# 修复:只创建一个 IdentityManager 实例
		self.identity_manager = IdentityManager()
		self.token = self.identity_manager.tokens  # 使用同一个实例
		# 初始化时设置默认请求头
		self._initialize_default_headers()

	def _initialize_default_headers(self) -> None:
		"""初始化默认请求头"""
		# 确保初始请求头正确设置
		default_headers = setting.SettingManager().data.PROGRAM.HEADERS.copy()
		self.update_headers(default_headers)

	def switch_identity(self, identity: str, token: str) -> None:
		"""切换身份并更新请求头 - 修复版本"""
		if not token or not token.strip():
			print(f"警告:尝试为身份 '{identity}' 设置空令牌")
			return
		# 验证身份类型
		valid_identities = ["average", "edu", "judgement", "blank"]
		if identity not in valid_identities:
			print(f"错误:无效的身份类型 '{identity}',有效身份:{valid_identities}")
			return
		try:
			# 使用身份管理器切换身份
			self.identity_manager.switch_identity(identity, token)
			# 获取身份认证头并更新
			identity_headers = self.identity_manager.get_identity_headers()
			if identity_headers and identity_headers.get("Authorization"):
				self.update_headers(identity_headers)
				print(f"已切换到身份: {identity}")
			else:
				print(f"警告:身份 '{identity}' 的认证头为空")
		except Exception as e:
			print(f"切换身份失败: {e}")


class CodeMaoWebSocketClient(IWebSocketClient):
	"""编程猫WebSocket客户端 - 修复版本"""

	def __init__(self) -> None:
		self._ws_app = None
		self._connected = False
		self._message_queue = []

	def connect(self, url: str) -> bool:
		"""连接WebSocket"""
		try:
			self._ws_app = websocket.create_connection(
				url,
				header={
					"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0",
					"Origin": "https://kn.codemao.cn",
					"Accept-Encoding": "gzip, deflate, br, zstd",
					"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
					"Cache-Control": "no-cache",
					"Pragma": "no-cache",
				},
				sslopt={"cert_reqs": ssl.CERT_NONE},
			)
			self._connected = True
			print(f"WebSocket连接已建立: {url}")
		except Exception as e:
			print(f"WebSocket连接失败: {e}")
			self._connected = False
			return False
		else:
			return True

	def disconnect(self) -> None:
		"""断开WebSocket连接"""
		if self._ws_app:
			with contextlib.suppress(Exception):
				self._ws_app.close()
			self._ws_app = None
		self._connected = False
		print("WebSocket连接已断开")

	def send(self, message: str | dict[str, Any]) -> bool:
		"""发送WebSocket消息"""
		if not self._ws_app or not self._connected:
			print("WebSocket未连接")
			return False
		try:
			if isinstance(message, dict):
				message = dumps(message, ensure_ascii=False)
			self._ws_app.send(message)
		except Exception as e:
			print(f"发送WebSocket消息失败: {e}")
			return False
		else:
			return True

	def receive(self, timeout: float = 30.0) -> str | bytes | None:
		"""接收WebSocket消息"""
		if not self._ws_app or not self._connected:
			return None
		try:
			self._ws_app.settimeout(timeout)
			message = self._ws_app.recv()
		except websocket.WebSocketTimeoutException:
			print("接收WebSocket消息超时")
			return None
		except Exception as e:
			print(f"接收WebSocket消息失败: {e}")
			return None
		else:
			return message

	def listen(self) -> Generator[str | bytes]:
		while self._connected and self._ws_app:
			try:
				message = self.receive(timeout=1.0)
				if message is not None:
					yield message
			except Exception:
				break

	@property
	def connected(self) -> bool:
		return self._connected

	def close(self) -> None:
		self.disconnect()

	def __enter__(self) -> "CodeMaoWebSocketClient":
		return self

	def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
		self.close()


class FileUploader(IFileUploader):
	"""文件上传器 - 整合原版上传逻辑"""

	def __init__(self) -> None:
		self.client = CodeMaoClient()
		self._upload_strategies = {
			"pgaot": self._upload_pgaot,
			"codegame": self._upload_codegame,
			"codemao": self._upload_codemao,
		}
		# 为文件上传创建独立session避免影响主会话
		self._upload_session = httpx.Client()

	def upload(self, file_path: Path, method: str, save_path: str = "aumiao") -> str:
		"""上传文件"""
		if method not in self._upload_strategies:
			error_msg = f"不支持的上传方式: {method}"
			raise ValueError(error_msg)
		return self._upload_strategies[method](file_path, save_path)

	def _upload_pgaot(self, file_path: Path, save_path: str) -> str:
		"""Pgaot上传"""
		with file_path.open("rb") as f:
			files = {"file": (file_path.name, f)}
			data = {"path": save_path}
			response = self._upload_request("POST", "https://api.pgaot.com/user/up_cat_file", files=files, data=data)
		return response.json()["url"]

	def _upload_codegame(self, file_path: Path, save_path: str) -> str:
		"""CodeGame上传"""
		token_info = self._get_codegame_token(save_path, file_path)
		with file_path.open("rb") as f:
			files = {"file": (file_path.name, f)}
			data = {
				"token": token_info["token"],
				"key": token_info["file_path"],
				"fname": "avatar",
			}
			response = self._upload_request("POST", token_info["upload_url"], files=files, data=data)
		result = response.json()
		return f"{token_info['pic_host']}/{result['key']}"

	def _upload_codemao(self, file_path: Path, save_path: str) -> str:
		"""CodeMao上传"""
		unique_name = f"{save_path}/{file_path.name}"
		token_info = self._get_codemao_token(unique_name)
		with file_path.open("rb") as f:
			files = {"file": (file_path.name, f)}
			data = {
				"token": token_info["token"],
				"key": token_info["file_path"],
				"fname": file_path.name,
			}
			self._upload_request("POST", token_info["upload_url"], files=files, data=data)
		return token_info["pic_host"] + token_info["file_path"]

	def _upload_request(self, method: str, endpoint: str, files: dict[str, Any] | None = None, data: dict[str, Any] | None = None, timeout: float = 120.0) -> Response:
		"""专门用于文件上传的请求方法"""
		headers = setting.SettingManager().data.PROGRAM.HEADERS.copy()
		if files:
			headers.pop("Content-Type", None)
			headers.pop("Content-Length", None)
		request_args: dict[str, Any] = {
			"method": method,
			"url": endpoint,
			"headers": headers,
			"timeout": timeout,
		}
		if files:
			request_args.update({"data": data, "files": files})
		else:
			request_args["json"] = data
		response = self._upload_session.request(**request_args)
		response.raise_for_status()
		return response

	def _get_codemao_token(self, file_path: str) -> dict[str, Any]:
		"""获取CodeMao上传token"""
		params = {
			"projectName": "community_frontend",
			"filePaths": file_path,
			"filePath": file_path,
			"tokensCount": 1,
			"fileSign": "p1",
			"cdnName": "qiniu",
		}
		response = self.client.send_request("GET", "https://open-service.codemao.cn/cdn/qi-niu/tokens/uploading", params=params)
		data = response.json()
		return {
			"token": data["tokens"][0]["token"],
			"file_path": data["tokens"][0]["file_path"],
			"upload_url": data["upload_url"],
			"pic_host": data["bucket_url"],
		}

	def _get_codegame_token(self, prefix: str, file_path: Path) -> dict[str, Any]:
		"""获取CodeGame上传token"""
		params = {"prefix": prefix, "bucket": "static", "type": file_path.suffix}
		response = self.client.send_request("GET", "https://oversea-api.code.game/tiger/kitten/cdn/token/1", params=params)
		data = response.json()
		return {
			"token": data["data"][0]["token"],
			"file_path": data["data"][0]["filename"],
			"pic_host": data["bucket_url"],
			"upload_url": "https://upload.qiniup.com",
		}

	def close(self) -> None:
		"""关闭上传会话"""
		self._upload_session.close()


# ==================== 工厂类 ====================
class ClientFactory:
	"""客户端工厂"""

	@staticmethod
	def create_http_client(config: ClientConfig | None = None) -> BaseHTTPClient:
		"""创建HTTP客户端"""
		config = config or ClientConfig()
		return BaseHTTPClient(config)

	@staticmethod
	def create_codemao_client() -> CodeMaoClient:
		"""创建编程猫HTTP客户端"""
		return CodeMaoClient()

	@staticmethod
	def create_websocket_client() -> CodeMaoWebSocketClient:
		"""创建WebSocket客户端"""
		return CodeMaoWebSocketClient()

	@staticmethod
	def create_codemao_websocket_client() -> CodeMaoWebSocketClient:
		"""创建编程猫WebSocket客户端"""
		return CodeMaoWebSocketClient()

	@staticmethod
	def create_file_uploader() -> FileUploader:
		"""创建文件上传器"""
		return FileUploader()
